import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.types.Row;

import java.util.Arrays;

public class Minus {

    public static void main(String[] args) throws Exception
    {



    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);

    DataSet<Order> ds1 = env.fromCollection(Arrays.asList(
            new Order(1L, "beer", 3),
            new Order(3L, "rubber", 2),
            new Order(1L, "diaper", 4)
    ));


    DataSet<Order> ds2 = env.fromCollection(Arrays.asList(
            new Order(2L, "beer", 36),
            new Order(3L, "rubber", 2),
            new Order(2L, "diaper", 34)
    ));




    Table left =  tEnv.fromDataSet(ds1, "user,product,amount");
    Table right = tEnv.fromDataSet(ds2, "user,product,amount");
    Table result = left.minus(right);
    tEnv.toDataSet(result, Row.class).print();
}

}